package com.udf.flink.scala.apitest.watermark

import java.text.SimpleDateFormat

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

//### 概念
//#Watermark机制
//#通常情况下由于网络或者系统等外部因素影响下，事件数据往往不能及时传输至FLink系统中，导致系统的不稳定而造成数据乱序到达或者延迟达到等问题，因此需要有一种机制能够控制数据处理的进度。
//#具体来讲，在创建一个基于时间的window后，需要确定属于该window的数据元素是否已经全部到达，确定后才可以对window中的所有数据做计算处理（如汇总、分组），如果数据并没有全部到达，则继续等待该窗口的数据全部到达后再开始计算。
//#但是对于但是对于late element，我们又不能无限期的等下去，必须要有个机制来保证一个特定的时间后，必须触发window去进行计算了。在这种情况下就需要用到水位线 (Watermark) 机制。

//### 作用
//#它能够衡量数据处理进度，保证事件数据全部到达Flink系统，即使数据乱序或者延迟到达，也能够像预期一样计算出正确和连续的结果。通常watermark是结合window来实现。

//### 原理
//#在 Flink 的窗口处理过程中，如果确定全部数据到达，就可以对 Window 的所有数据做窗口计算操作（如汇总、分组等），如果数据没有全部到达，则继续等待该窗口中的数据全部到达才开始处理。
//#这种情况下就需要用到水位线（WaterMarks）机制，它能够衡量数据处理进度（表达数据到达的完整性），保证事件数据（全部）到达Flink系统，或者在乱序及延迟到达时，也能够像预期一样计算出正确并且连续的结果。
//#当任何 Event 进入到 Flink 系统时，会根据当前最大事件时间产生 Watermarks 时间戳。

//#那么 Flink 是怎么计算 Watermak 的值呢？
//Watermark = 进入 Flink 的最大的事件时间（maxEventTime）— 指定的延迟时间（t）

//#那么有 Watermark 的 Window 是怎么触发窗口函数的呢？
//1.watermark >= window的结束时间
//2.该窗口必须有数据  注意：[window_start_time,window_end_time) 中有数据存在，前闭后开区间
//#注意：Watermark 本质可以理解成一个延迟触发机制。

//### 使用
//#Watermark 的使用存在三种情况
//#### 有序流
//#有序的数据流中的watermark
//如果数据元素的事件时间是有序的，Watermark 时间戳会随着数据元素的事件时间按顺序生成，此时水位线的变化和事件时间保持一致（因为既然是有序的时间，就不需要设置延迟了，那么 t 就是 0。所以 watermark=maxtime-0 = maxtime），也就是理想状态下的水位线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算，以此类推， 下一个 Window 也是一样。

//#### 乱序流
//#乱序的数据流watermark
//现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理，而频繁出现乱序或迟到的情况，这种情况就需要使用 Watermarks 来应对。比如下图，设置延迟时间t为2。

//#### 并行流
//#并行数据流中的 Watermark
//在多并行度的情况下，Watermark 会有一个对齐机制，这个对齐机制会取所有 Channel 中最小的 Watermark。
class WaterMark {

}

class MyWindowFunction extends WindowFunction[(String, Long), String, Tuple, TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {

    val keyStr = key.toString
    val arrBuff = ArrayBuffer[Long]()
    val ite = input.iterator

    while (ite.hasNext) {
      val tup2 = ite.next()
      arrBuff.append(tup2._2)
    }

    val arr = arrBuff.toArray
    Sorting.quickSort(arr)
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

    out.collect("聚合数据key为：" + keyStr
      + "，窗口中数据条数为" + arr.length
      + "，窗口中第一条数据：" + sdf.format(arr.head)
      + "，窗口中最后一条数据：" + sdf.format(arr.last)
      + "，窗口起始时间：" + sdf.format(window.getStart)
      + "，窗口结束时间：" + sdf.format(window.getEnd)

    )

  }
}
